Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor internal buffer chain in the memory queue #37795

Merged
merged 15 commits into from
Feb 13, 2024

Conversation

faec
Copy link
Contributor

@faec faec commented Jan 30, 2024

Proposed commit message

  • Refactor the buffered and unbuffered memory queue implementations into a single object
  • Use a ring buffer for both cases (previously the buffered case used a linked list of independent buffers)
  • Simplify the behavior of queue.mem.flush.min_events so it can't starve output workers when enough data is ready (previously it interacted in complicated ways with bulk_max_size, now it serves as a simple maximum on allowable event batch size).

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

Test any ingestion while varying queue-related parameters, most importantly queue.mem.flush.min_events, queue.mem.flush.timeout, and output.elasticsearch.bulk_max_size

Related issues

@faec faec added bug Team:Elastic-Agent Label for the Agent team labels Jan 30, 2024
@faec faec self-assigned this Jan 30, 2024
@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Jan 30, 2024
Copy link
Contributor

mergify bot commented Jan 30, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @faec? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@elasticmachine
Copy link
Collaborator

❕ Build Aborted

Either there was a build timeout or someone aborted the build.

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview

Expand to view the summary

Build stats

  • Start Time: 2024-01-30T22:53:51.880+0000

  • Duration: 364 min 41 sec

Test stats 🧪

Test Results
Failed 1551
Passed 24587
Skipped 1561
Total 27699

Test errors 1551

Expand to view the tests failures

> Show only the first 10 test failures

Build&Test / filebeat-unitTest / test_container_input – filebeat.tests.system.test_container.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds: 
    

    Expand to view the stacktrace

     self = <test_container.Test testMethod=test_container_input>
    
            def test_container_input(self):
                """
                Test container input
                """
                input_raw = """
        - type: container
          paths:
            - {}/logs/*.log
        """
                self.render_config_template(
                    input_raw=input_raw.format(os.path.abspath(self.working_dir)),
                    inputs=False,
                )
        
                os.mkdir(self.working_dir + "/logs/")
                self.copy_files(["logs/docker.log"],
                                target_dir="logs")
        
                filebeat = self.start_beat()
        
    >           self.wait_until(lambda: self.output_has(lines=21))
    
    tests/system/test_container.py:31: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_container.Test testMethod=test_container_input>
    cond = <function Test.test_container_input.<locals>.<lambda> at 0x7f9a924589d0>
    max_timeout = 20, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_container_input_registry_for_unparsable_lines – filebeat.tests.system.test_container.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds: 
    

    Expand to view the stacktrace

     self = <test_container.Test testMethod=test_container_input_registry_for_unparsable_lines>
    
            def test_container_input_registry_for_unparsable_lines(self):
                """
                Test container input properly updates registry offset in case
                of unparsable lines
                """
                input_raw = """
        - type: container
          paths:
            - {}/logs/*.log
        """
                self.render_config_template(
                    input_raw=input_raw.format(os.path.abspath(self.working_dir)),
                    inputs=False,
                )
        
                os.mkdir(self.working_dir + "/logs/")
                self.copy_files(["logs/docker_corrupted.log"],
                                target_dir="logs")
        
                filebeat = self.start_beat()
        
    >           self.wait_until(lambda: self.output_has(lines=20))
    
    tests/system/test_container.py:91: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_container.Test testMethod=test_container_input_registry_for_unparsable_lines>
    cond = <function Test.test_container_input_registry_for_unparsable_lines.<locals>.<lambda> at 0x7f9a9245bf40>
    max_timeout = 20, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-pythonIntegTest / test_container_input – filebeat.tests.system.test_container.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds: 
    

    Expand to view the stacktrace

     self = <test_container.Test testMethod=test_container_input>
    
            def test_container_input(self):
                """
                Test container input
                """
                input_raw = """
        - type: container
          paths:
            - {}/logs/*.log
        """
                self.render_config_template(
                    input_raw=input_raw.format(os.path.abspath(self.working_dir)),
                    inputs=False,
                )
        
                os.mkdir(self.working_dir + "/logs/")
                self.copy_files(["logs/docker.log"],
                                target_dir="logs")
        
                filebeat = self.start_beat()
        
    >           self.wait_until(lambda: self.output_has(lines=21))
    
    tests/system/test_container.py:31: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_container.Test testMethod=test_container_input>
    cond = <function Test.test_container_input.<locals>.<lambda> at 0x7fb8d42175b0>
    max_timeout = 20, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-pythonIntegTest / test_container_input_registry_for_unparsable_lines – filebeat.tests.system.test_container.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds: 
    

    Expand to view the stacktrace

     self = <test_container.Test testMethod=test_container_input_registry_for_unparsable_lines>
    
            def test_container_input_registry_for_unparsable_lines(self):
                """
                Test container input properly updates registry offset in case
                of unparsable lines
                """
                input_raw = """
        - type: container
          paths:
            - {}/logs/*.log
        """
                self.render_config_template(
                    input_raw=input_raw.format(os.path.abspath(self.working_dir)),
                    inputs=False,
                )
        
                os.mkdir(self.working_dir + "/logs/")
                self.copy_files(["logs/docker_corrupted.log"],
                                target_dir="logs")
        
                filebeat = self.start_beat()
        
    >           self.wait_until(lambda: self.output_has(lines=20))
    
    tests/system/test_container.py:91: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_container.Test testMethod=test_container_input_registry_for_unparsable_lines>
    cond = <function Test.test_container_input_registry_for_unparsable_lines.<locals>.<lambda> at 0x7fb8d53dc040>
    max_timeout = 20, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 20 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_default_include_exclude_lines – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_default_include_exclude_lines>
    
        def test_default_include_exclude_lines(self):
            """
            Checks if all the log lines are exported by default
            """
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
            file = open(testfile, 'w')
        
            iterations = 20
            for n in range(0, iterations):
                file.write("DBG: a simple debug message" + str(n))
                file.write("\n")
                file.write("ERR: a simple error message" + str(n))
                file.write("\n")
                file.write("WARNING: a simple warning message" + str(n))
                file.write("\n")
        
            file.close()
        
            filebeat = self.start_beat()
        
    >       self.wait_until(
                lambda: self.output_has(60),
                max_timeout=15)
    
    tests/system/test_crawler.py:672: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_default_include_exclude_lines>
    cond = <function Test.test_default_include_exclude_lines.<locals>.<lambda> at 0x7f9a9245bc70>
    max_timeout = 15, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_exclude_lines – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_exclude_lines>
    
        def test_exclude_lines(self):
            """
            Checks if the lines matching exclude_lines regexp are dropped
            """
        
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
                exclude_lines=["^DBG"],
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
            file = open(testfile, 'w')
        
            iterations = 20
            for n in range(0, iterations):
                file.write("DBG: a simple debug message" + str(n))
                file.write("\n")
                file.write("ERR: a simple error message" + str(n))
                file.write("\n")
                file.write("WARNING: a simple warning message" + str(n))
                file.write("\n")
        
            file.close()
        
            filebeat = self.start_beat()
        
    >       self.wait_until(
                lambda: self.output_has(40),
                max_timeout=15)
    
    tests/system/test_crawler.py:710: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_exclude_lines>
    cond = <function Test.test_exclude_lines.<locals>.<lambda> at 0x7f9a948d2f80>
    max_timeout = 15, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_fetched_lines – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 10 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_fetched_lines>
    
        def test_fetched_lines(self):
            """
            Checks if all lines are read from the log file.
            """
        
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
            file = open(testfile, 'w')
        
            iterations = 80
            for n in range(0, iterations):
                file.write("hello world" + str(n))
                file.write("\n")
        
            file.close()
        
            filebeat = self.start_beat()
        
    >       self.wait_until(
                lambda: self.output_has(lines=iterations), max_timeout=10)
    
    tests/system/test_crawler.py:38: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_fetched_lines>
    cond = <function Test.test_fetched_lines.<locals>.<lambda> at 0x7f9a92399b40>
    max_timeout = 10, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 10 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_include_exclude_lines – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_include_exclude_lines>
    
        def test_include_exclude_lines(self):
            """
            Checks if all the log lines are exported by default
            """
        
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
                exclude_lines=["^DBG"],
                include_lines=["apache"],
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
            file = open(testfile, 'w')
        
            iterations = 20
            for n in range(0, iterations):
                file.write("DBG: a simple debug message" + str(n))
                file.write("\n")
                file.write("ERR: apache simple error message" + str(n))
                file.write("\n")
                file.write("ERR: a simple warning message" + str(n))
                file.write("\n")
        
            file.close()
        
            filebeat = self.start_beat()
        
    >       self.wait_until(
                lambda: self.output_has(20),
                max_timeout=15)
    
    tests/system/test_crawler.py:749: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_include_exclude_lines>
    cond = <function Test.test_include_exclude_lines.<locals>.<lambda> at 0x7f9a9239a170>
    max_timeout = 15, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_include_lines – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_include_lines>
    
        def test_include_lines(self):
            """
            Checks if only the log lines defined by include_lines are exported
            """
        
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
                include_lines=["^ERR", "^WARN"],
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
            file = open(testfile, 'w')
        
            iterations = 20
            for n in range(0, iterations):
                file.write("DBG: a simple debug message" + str(n))
                file.write("\n")
                file.write("ERR: a simple error message" + str(n))
                file.write("\n")
                file.write("WARNING: a simple warning message" + str(n))
                file.write("\n")
        
            file.close()
        
            filebeat = self.start_beat()
        
    >       self.wait_until(
                lambda: self.output_has(40),
                max_timeout=15)
    
    tests/system/test_crawler.py:636: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_include_lines>
    cond = <function Test.test_include_lines.<locals>.<lambda> at 0x7f9a9239b1c0>
    max_timeout = 15, poll_interval = 0.1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 15 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Build&Test / filebeat-unitTest / test_multiple_appends – filebeat.tests.system.test_crawler.Test
    Expand to view the error details

     beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 60 seconds: 
    

    Expand to view the stacktrace

     self = <test_crawler.Test testMethod=test_multiple_appends>
    
        def test_multiple_appends(self):
            """
            Test that filebeat keeps picking up new lines
            after appending multiple times
            """
        
            self.render_config_template(
                path=os.path.abspath(self.working_dir) + "/log/*",
            )
            os.mkdir(self.working_dir + "/log/")
        
            testfile = self.working_dir + "/log/test.log"
        
            filebeat = self.start_beat()
        
            # Write initial file
            with open(testfile, 'w') as f:
                f.write("hello world\n")
                f.flush()
        
                self.wait_until(
                    lambda: self.output_has(1),
                    max_timeout=60, poll_interval=1)
        
            lines_written = 0
        
            for n in range(3):
                with open(testfile, 'a') as f:
        
                    for i in range(0, 20 + n):
                        f.write("hello world " + str(i) + " " + str(n) + "\n")
                        lines_written = lines_written + 1
        
                    f.flush()
        
    >               self.wait_until(
                        lambda: self.output_has(lines_written + 1),
                        max_timeout=60, poll_interval=1)
    
    tests/system/test_crawler.py:404: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_crawler.Test testMethod=test_multiple_appends>
    cond = <function Test.test_multiple_appends.<locals>.<lambda> at 0x7f9a92399750>
    max_timeout = 60, poll_interval = 1, name = 'cond', err_msg = ''
    
        def wait_until(self, cond, max_timeout=20, poll_interval=0.1, name="cond", err_msg=""):
            """
            TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
            Waits until the cond function returns true,
            or until the max_timeout is reached. Calls the cond
            function every poll_interval seconds.
        
            If the max_timeout is reached before cond() returns
            true, an exception is raised.
            """
            start = datetime.now()
            while not cond():
                if datetime.now() - start > timedelta(seconds=max_timeout):
                    print("Test has failed, here are the Beat logs")
                    for l in self.get_log_lines():
                        print(l)
    >               raise WaitTimeoutError(
                        f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
    E               beat.beat.WaitTimeoutError: Timeout waiting for condition 'cond'. Waited 60 seconds:
    
    ../libbeat/tests/system/beat/beat.py:449: WaitTimeoutError 
    

Steps errors 69

Expand to view the steps failures

Show only the first 10 steps failures

x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 11 min 40 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 4 min 27 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 4 min 26 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 15 min 21 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 7 min 31 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 5 min 33 sec . View more details here
  • Description: mage build unitTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 14 min 26 sec . View more details here
  • Description: mage pythonIntegTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 9 min 32 sec . View more details here
  • Description: mage pythonIntegTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 7 min 12 sec . View more details here
  • Description: mage pythonIntegTest
Error signal
  • Took 0 min 0 sec . View more details here
  • Description: Error 'hudson.AbortException: script returned exit code 1'

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@elasticmachine
Copy link
Collaborator

❕ Build Aborted

Either there was a build timeout or someone aborted the build.

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview

Expand to view the summary

Build stats

  • Start Time: 2024-01-31T22:02:10.545+0000

  • Duration: 364 min 31 sec

Test stats 🧪

Test Results
Failed 1458
Passed 24678
Skipped 1564
Total 27700

Test errors 1458

Expand to view the tests failures

> Show only the first 10 test failures

Build&Test / filebeat-pythonIntegTest / test_fileset_file_000_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_000_kafka200/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_000_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_000_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_000_kafka200/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_001_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_001_kafka515/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_001_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_001_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_001_kafka515/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_002_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_002_kafka453/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_002_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_002_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_002_kafka453/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_003_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_003_kafka541/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_003_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_003_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_003_kafka541/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_004_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_004_kafka638/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_004_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_004_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_004_kafka638/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_005_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_005_kafka264/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_005_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_005_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_005_kafka264/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_006_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_006_kafka889/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_006_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_006_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_006_kafka889/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_007_kafka – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_007_kafka799/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_007_kafka>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_007_kafka>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_007_kafka799/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_008_system – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_008_system880/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_008_system>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_008_system>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_008_system880/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Build&Test / filebeat-pythonIntegTest / test_fileset_file_009_system – filebeat.tests.system.test_modules.Test
    Expand to view the error details

     FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_009_system749/filebeat-20240201.ndjson' 
    

    Expand to view the stacktrace

     a = (<test_modules.Test testMethod=test_fileset_file_009_system>,)
    
        @wraps(func)
        def standalone_func(*a):
    >       return func(*(a + p.args), **p.kwargs)
    
    ../../../../../python-env/build/ve/linux/lib/python3.10/site-packages/parameterized/parameterized.py:518: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    tests/system/test_modules.py:150: in test_fileset_file
        self.run_on_file(
    tests/system/test_modules.py:220: in run_on_file
        self.wait_until(lambda: self.es.indices.exists(self.index_name),
    ../libbeat/tests/system/beat/beat.py:447: in wait_until
        for l in self.get_log_lines():
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <test_modules.Test testMethod=test_fileset_file_009_system>
    logfile = 'filebeat-20240201.ndjson'
    
        def get_log_lines(self, logfile=None):
            """
            Returns the log lines as a list of strings
            """
            if logfile is None:
                logfile = self.beat_name + "-" + self.today + ".ndjson"
        
    >       with open(os.path.join(self.working_dir, logfile), 'r', encoding="utf_8") as f:
    E       FileNotFoundError: [Errno 2] No such file or directory: '/var/lib/jenkins/workspace/PR-37795-2-5c8fd183-bc3a-4734-b627-2c708f4aa278/src/github.com/elastic/beats/filebeat/build/system-tests/run/test_modules.Test.test_fileset_file_009_system749/filebeat-20240201.ndjson'
    
    ../libbeat/tests/system/beat/beat.py:480: FileNotFoundError 
    

Steps errors 71

Expand to view the steps failures

Show only the first 10 steps failures

x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 11 min 49 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 5 min 32 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2022-windows-2022 - mage build unitTest
  • Took 4 min 28 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 11 min 48 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 5 min 30 sec . View more details here
  • Description: mage build unitTest
x-pack/filebeat-windows-2016-windows-2016 - mage build unitTest
  • Took 5 min 31 sec . View more details here
  • Description: mage build unitTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 14 min 23 sec . View more details here
  • Description: mage pythonIntegTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 7 min 28 sec . View more details here
  • Description: mage pythonIntegTest
x-pack/metricbeat-pythonIntegTest - mage pythonIntegTest
  • Took 7 min 59 sec . View more details here
  • Description: mage pythonIntegTest
Error signal
  • Took 0 min 0 sec . View more details here
  • Description: Error 'hudson.AbortException: script returned exit code 1'

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@elasticmachine
Copy link
Collaborator

elasticmachine commented Feb 1, 2024

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2024-02-13T15:17:40.295+0000

  • Duration: 134 min 34 sec

Test stats 🧪

Test Results
Failed 0
Passed 28917
Skipped 2022
Total 30939

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

mergify bot commented Feb 5, 2024

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @faec? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@faec faec marked this pull request as ready for review February 5, 2024 21:27
@faec faec requested a review from a team as a code owner February 5, 2024 21:27
@faec faec requested review from belimawr and rdner February 5, 2024 21:27
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@faec faec requested a review from leehinman February 5, 2024 21:28
Copy link
Contributor

@belimawr belimawr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR @faec!

I confess it was a bit hard to know where to start reviewing it because I'm not familiar with the queue. Looking at runloop.go first helped me.

One thing I'm missing is some test to ensure the new behaviour, you mention it in the proposed commit message:

Simplify the behavior of queue.mem.flush.min_events so it can't starve output workers when enough data is ready (previously it interacted in complicated ways with bulk_max_size, now it serves as a simple maximum on allowable event batch size).

But I didn't see a test ensure it actually works, ideally a test that would fail with the old behaviour and pass with the new one.

libbeat/publisher/queue/memqueue/config.go Show resolved Hide resolved
libbeat/publisher/queue/memqueue/runloop.go Outdated Show resolved Hide resolved
libbeat/publisher/queue/memqueue/runloop.go Outdated Show resolved Hide resolved
libbeat/publisher/queue/memqueue/runloop.go Show resolved Hide resolved
libbeat/publisher/queue/memqueue/runloop.go Outdated Show resolved Hide resolved
libbeat/publisher/queue/memqueue/runloop.go Outdated Show resolved Hide resolved
libbeat/publisher/queue/memqueue/broker.go Outdated Show resolved Hide resolved
libbeat/publisher/queue/memqueue/broker.go Show resolved Hide resolved
@belimawr
Copy link
Contributor

belimawr commented Feb 6, 2024

@faec is there any easy to reproduce test scenario that we can see the improvements introduced by this PR? I've tried a few different settings following the scenarios from #37757, but I didn't notice any difference :/

@faec
Copy link
Contributor Author

faec commented Feb 6, 2024

is there any easy to reproduce test scenario that we can see the improvements introduced by this PR? I've tried a few different settings following the scenarios from #37757, but I didn't notice any difference :/

It's hard to test the worst behavior without a live input like s3 that will provide the pushback that caused the issue. The performance is expected to be quite similar in most cases, and without active pushback from an input you will mostly just see increased latency, as the input will eventually reach equilibrium and keep up with ingestion. But here's a contrived way to test it:

  • Set queue.mem.flush.timeout to some unreasonably large value like 5m
  • Set bulk_max_size to (say) 100 and worker to (say) 8.
  • Point filestream at an input file that has 1000 events.

In base 8.16, this should stall for 5 minutes (because it is waiting to accumulate a buffer of 1600 events) and then ingest everything at once. With this PR applied, it should instead ingest almost everything immediately and only delay for the last <100 events if the remainder isn't exact.

@cmacknz
Copy link
Member

cmacknz commented Feb 6, 2024

The documentation for the queue at https://github.com/elastic/beats/blob/main/libbeat/docs/queueconfig.asciidoc looks like it needs to be updated.

@cmacknz
Copy link
Member

cmacknz commented Feb 6, 2024

We also need a changelog entry describing this in a way a user would understand.

@strawgate
Copy link
Contributor

strawgate commented Feb 7, 2024

Set queue.mem.flush.timeout to some unreasonably large value like 5m
Set bulk_max_size to (say) 100 and worker to (say) 8.
Point filestream at an input file that has 1000 events.

What are the advantages of the implementation in this PR vs just deprecating the user facing min_events setting (making it no longer user configurable) and just having the internal value mirror whatever value is used for max bulk size?

@faec
Copy link
Contributor Author

faec commented Feb 8, 2024

What are the advantages of the implementation in this PR vs just deprecating the user facing min_events setting (making it no longer user configurable) and just having the internal value mirror whatever value is used for max bulk size?

Mainly backwards compatibility: some users have used flush.min_events to set the batch size, and set bulk_max_size to 0 which reads all available events (or similarly any configuration where flush.min_events is significantly lower than bulk_max_size or its default). In that case, fully deprecating the parameter would try to publish much larger batches than the user had configured.

Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions, but I don't think they are blockers for the PR.


chanSize := AdjustInputQueueSize(inputQueueSize, sz)
// Start the queue workers
b.wg.Add(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this would ever happen, but since we added 2 here, is it possible for the run methods to block on each other in any way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, their only communication is from runLoop to ackLoop via consumedChan, and both sender and receiver are in select statements that include the queue's global context.

// (Otherwise, it would make sense to leave FlushTimeout unchanged here.)
if settings.MaxGetRequest <= 1 {
settings.FlushTimeout = 0
settings.MaxGetRequest = (settings.Events + 1) / 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Events & MaxGetRequests are ints, so they could be negative, which could lead to some interesting results. Do we have protections to make sure these are positive values? or maybe switch to uint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Events has a min of 32 and MaxGetRequest has a min of 0 enforced by the config parser.

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @faec

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @faec

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @faec

@elasticmachine
Copy link
Collaborator

elasticmachine commented Feb 13, 2024

💛 Build succeeded, but was flaky

Failed CI Steps

History

cc @faec

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @faec

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @faec

@belimawr
Copy link
Contributor

is there any easy to reproduce test scenario that we can see the improvements introduced by this PR? I've tried a few different settings following the scenarios from #37757, but I didn't notice any difference :/

It's hard to test the worst behavior without a live input like s3 that will provide the pushback that caused the issue. The performance is expected to be quite similar in most cases, and without active pushback from an input you will mostly just see increased latency, as the input will eventually reach equilibrium and keep up with ingestion. But here's a contrived way to test it:

  • Set queue.mem.flush.timeout to some unreasonably large value like 5m
  • Set bulk_max_size to (say) 100 and worker to (say) 8.
  • Point filestream at an input file that has 1000 events.

In base 8.16, this should stall for 5 minutes (because it is waiting to accumulate a buffer of 1600 events) and then ingest everything at once. With this PR applied, it should instead ingest almost everything immediately and only delay for the last <100 events if the remainder isn't exact.

Thank you @faec! It was super easy to test with those values ❤️

For reference in case someone else also wants to test

filebeat.yml

filebeat.inputs:
- type: filestream
  id: my-filestream-id
  enabled: true
  paths:
    - /tmp/flog.log

queue.mem:
  events: 4096 # bigger than the number of events on the file
  flush:
    timeout: 30s
    min_events: 1600 # bigger than the number of events on the file

output.elasticsearch:
  enabled: true
  hosts: localhost:9200
  protocol: http
  username: admin
  password: testing
  bulk_max_size: 100
  workers: 2

logging:
  level: debug
  to_stderr: true
  selectors:
    - elasticsearch
    - eslegclient

Building form main Filebeat stalls for 30s right after starting the input.

{"log.level":"info","@timestamp":"2024-02-13T19:04:48.574+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput","file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 16503871974922524527)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2024-02-13T19:04:48.575+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start","file.name":"beater/crawler.go","file.line":106},"message":"Loading and starting Inputs completed. Enabled inputs: 1","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2024-02-13T19:04:48.575+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1","file.name":"compat/compat.go","file.line":121},"message":"Input 'filestream' starting","service.name":"filebeat","id":"my-filestream-id","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2024-02-13T19:04:48.575+0100","log.logger":"metric_registry","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/inputmon.NewInputRegistry","file.name":"inputmon/input.go","file.line":63},"message":"registering","service.name":"filebeat","input_type":"filestream","id":"my-filestream-id","key":"my-filestream-id","uuid":"ce41369e-4adf-4d8f-b0a1-4e12376a32d4","ecs.version":"1.6.0"}

Once the queue flush timeout is reached, we can see the output working

{"log.level":"info","@timestamp":"2024-02-13T19:05:18.577+0100","log.logger":"esclientleg","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/esleg/eslegclient.(*Connection).Ping","file.name":"eslegclient/connection.go","file.line":304},"message":"Attempting to connect to Elasticsearch version 8.13.0-SNAPSHOT (default)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2024-02-13T19:05:18.578+0100","log.logger":"esclientleg","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/esleg/eslegclient.(*Connection).Ping","file.name":"eslegclient/connection.go","file.line":304},"message":"Attempting to connect to Elasticsearch version 8.13.0-SNAPSHOT (default)","service.name":"filebeat","ecs.version":"1.6.0"}

...

{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.606+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 24.403962ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.640+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 34.017819ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.654+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 14.020265ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.669+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 14.51131ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.688+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 18.719864ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.703+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 14.74089ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.716+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 13.112544ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.727+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 11.490332ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.739+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 11.408433ms.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-02-13T19:05:18.751+0100","log.logger":"elasticsearch","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch.(*Client).publishEvents","file.name":"elasticsearch/client.go","file.line":268},"message":"PublishEvents: 100 events have been published to elasticsearch in 12.236254ms.","service.name":"filebeat","ecs.version":"1.6.0"}

Building from this PR it publishes everything right away 🚀

@faec faec merged commit ecfb88f into elastic:main Feb 13, 2024
106 checks passed
@faec faec deleted the memqueue-ringbuf branch February 13, 2024 18:50
@faec faec mentioned this pull request Mar 22, 2024
6 tasks
faec added a commit that referenced this pull request Apr 5, 2024
Delete the proxy queue, a prototype written to reduce memory use in the old shipper project. Recent improvements to the memory queue (#37795, #38166) added support for the same early-free mechanisms as the proxy queue, so it is now redundant.

The proxy queue was never used or exposed in a public release, so there are no compatibility concerns.

(This is pre-cleanup for adding early-encoding support, to avoid implementing new functionality in a queue that is no longer used.)
zeynepyz pushed a commit to zeynepyz/beats that referenced this pull request Apr 7, 2024
Delete the proxy queue, a prototype written to reduce memory use in the old shipper project. Recent improvements to the memory queue (elastic#37795, elastic#38166) added support for the same early-free mechanisms as the proxy queue, so it is now redundant.

The proxy queue was never used or exposed in a public release, so there are no compatibility concerns.

(This is pre-cleanup for adding early-encoding support, to avoid implementing new functionality in a queue that is no longer used.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Queue flush settings interact badly with output's bulk_max_size
6 participants